package io.gitee.thant.utils;

import java.io.File;
import java.io.FileInputStream;
import java.io.RandomAccessFile;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

public class SafeQueue {
	class ItemWrap {
		long id;
		Object dat;
	}
	
	class QueuePoint implements Cloneable {
		int bno = 0;
		int bpos = 0;
		int eno = 0;
		int epos = 0;
		
		@Override
		protected QueuePoint clone() {
			QueuePoint obj = new QueuePoint();
			obj.bno = this.bno;
			obj.bpos = this.bpos;
			obj.eno = this.eno;
			obj.epos = this.epos;
			return obj;
		}
	}
	
	private final static String emptyObj = "<=+#*SafeQueue_null*#+=>";
	
	//private AtomicLong seqno = new AtomicLong(0);
	private long seqno = 0;
	private final List<Object[]> _queueLst = new LinkedList<Object[]>();
	private QueuePoint _pt = new QueuePoint();
	
	private int singleLength = 100000;
	private String dataPath = "";
	
	private ReentrantLock datlock = new ReentrantLock();
	
	public SafeQueue(String path, int len) {
		if (len>0) {
			singleLength = len;
		}
		dataPath = path;
		_queueLst.add(new Object[singleLength]);
		readStorage();
	}
		
	public long size() {
		datlock.lock();
		try {
			long bpos = _pt.bno*singleLength+_pt.bpos;
			long epos = _pt.eno*singleLength+_pt.epos;
			long len = epos - bpos;
			return len>=0 ? len : _queueLst.size()*singleLength + len;
		} finally {
			datlock.unlock();
		}
	}
	
	private QueuePoint nextPosition(QueuePoint pt, boolean endpt) {
		if (endpt) {
			if (pt.epos+1<singleLength) {
				++pt.epos;
			} else if (pt.eno+1 < _queueLst.size()) {
				pt.eno ++;
				pt.epos = 0;
			} else {
				pt.eno = 0;
				pt.epos = 0;
			}
			if (pt.bno == pt.eno && pt.bpos == pt.epos) return null;
		} else {
			if (pt.bno == pt.eno && pt.bpos == pt.epos) return null;
			if (pt.bpos+1<singleLength) {
				++pt.bpos;
			} else if (pt.bno+1 < _queueLst.size()) {
				pt.bno ++;
				pt.bpos = 0;
			} else {
				pt.bno = 0;
				pt.bpos = 0;
			}
		}
		return pt;
	}

	public /*synchronized*/ boolean add(Object obj) {
		datlock.lock();
		try {
			return _add(obj, true);
		} finally {
			datlock.unlock();
		}
	}
	
	//未能保证FIFO
	private boolean _add(Object obj, boolean record) {
		QueuePoint pt = nextPosition(_pt.clone(), true);
		if (null == pt) {
			//满了
			_queueLst.add(new Object[singleLength]);
			pt = new QueuePoint();
			pt.bno = 0;
			pt.bpos = 0;
			pt.eno = _queueLst.size()-1;
			pt.epos = 0;
		}

		ItemWrap item = new ItemWrap();
		item.dat = obj;
		item.id = ++seqno; //seqno.incrementAndGet();
		_queueLst.get(_pt.eno)[_pt.epos] = item;
		
		if (record) {
			try {
				StringBuilder sb = new StringBuilder();
				sb.append(item.id).append(',');
				SerializeUtil.writeObject(sb, obj);
				sb.append("\r\n");
				FileUtil.writeByteArrayToFile(new File(dataPath+".add"), sb.toString().getBytes("UTF-8"), true);
			} catch (Exception e) {
				e.printStackTrace();
				return false;
			}
		}
		
		_pt = pt;
		return true;
	}
	
	private Object _take() {
		if (_pt.bno == _pt.eno && _pt.bpos == _pt.epos) {
			_pt.bno = _pt.eno = _pt.bpos = _pt.epos = 0;
			return emptyObj;
		}
		QueuePoint pt = nextPosition(_pt.clone(), false);
		ItemWrap item = (ItemWrap)_queueLst.get(_pt.bno)[_pt.bpos];

		try {
			StringBuilder sb = new StringBuilder();
			sb.append(',').append(item.id);
			FileUtil.writeByteArrayToFile(new File(dataPath+".take"), sb.toString().getBytes("UTF-8"), true);
		} catch (Exception e) {
			e.printStackTrace();
			return emptyObj;
		}

		_pt = pt;
		return item.dat;
	}

	public /*synchronized*/ Object take() {
		datlock.lock();
		try {
			return _take();
		} finally {
			datlock.unlock();
		}
	}
	
	public static boolean isValid(Object dat) {
		return dat != emptyObj;
	}
	
	//未恢复seqno
	public synchronized void readStorage() {
		long t = System.currentTimeMillis();
		String waitfilepath = dataPath+".add";
		String donefilepath = dataPath+".take";
		String newwaitfilepath = waitfilepath+"."+t+".bak";
		String newdonefilepath = donefilepath+"."+t+".bak";

		File waitfile = new File(waitfilepath);
		File donefile = new File(donefilepath);
		if (waitfile.exists() || donefile.exists()) {
			try {
				FileUtil.moveFile(waitfile, new File(newwaitfilepath));
				FileUtil.moveFile(donefile, new File(newdonefilepath));
			} catch (Exception e) { }
		} else {
			return;
		}
		
		new Thread("readStorage") {
			@Override
			public void run() {
				File waitfile = new File(newwaitfilepath);
				File donefile = new File(newdonefilepath);
				
				System.out.println("read "+donefilepath);
				Set<String> done = new HashSet<String>();
				try {
					String taked = FileUtil.readFileToString(donefile, "UTF-8");
					if (taked.length()>0) {
						String[] idA = StringUtil.split(taked, ",");
						LogHelper.info(idA.length+"行");
						for (int i=0; i<idA.length; ++i) {
							done.add(idA[i]);
						}
					}
				} catch (Exception e) { }
				System.out.println("done="+done.size()+
					"\r\n"+
					"read "+waitfilepath);
				
				/*此方式较耗内存
				try (LineIterator it = FileUtil.lineIterator(waitfile, "UTF-8")) {
					while (it.hasNext()) {
						String line = it.nextLine();
						String[] partA = StringUtil.split(line, ",");
						if (partA.length>1) {
							System.out.print("id="+partA[0]);
							if (!done.contains(partA[0])) {
								Object obj = SerializeUtil.readObject(partA[1].getBytes());
								if (SerializeUtil.isValid(obj)) {
									System.out.println(",add="+_add(obj, false));
								}
							}
						}
					}
				} catch (Exception e) { }
				System.out.println("waited="+size());*/
				
				FileInputStream inputStream = null;
				Scanner sc = null;
				try {
					long sz = 0;
				    inputStream = new FileInputStream(waitfile.getPath());
				    sc = new Scanner(inputStream, "UTF-8");
				    while (sc.hasNextLine()) {
				        String line = sc.nextLine();
						String[] partA = StringUtil.split(line, ",");
						if (partA.length>1) {
							System.out.print("id="+partA[0]);
							if (!done.contains(partA[0])) {
								Object obj = SerializeUtil.readObject(partA[1].getBytes());
								if (SerializeUtil.isValid(obj)) {
									boolean ok = false;
									System.out.print(",add=");
									datlock.lock();
									try {
										ok = add(obj);
										if (ok) {
											sz = size();
										}
									} finally {
										datlock.unlock();
									}
									System.out.println(ok+","+sz);
								}
							}
						}
						Thread.sleep(sz/100);
				    }
				    /*if (sc.ioException() != null) {
				        throw sc.ioException();
				    }*/
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
				    if (inputStream != null) {
				        try { inputStream.close(); } catch (Exception e) { }
				    }
				    if (sc != null) {
				        sc.close();
				    }
				}
			}
		}.start();
	}
	
	public synchronized boolean resetIfEmpty() {
		datlock.lock();
		try {
			if (0 == size()) {
				Iterator<Object[]> it = _queueLst.iterator();
				while (_queueLst.size()>1) {
					it.next();
					it.remove();
				}
				_pt.bno = 0;
				_pt.bpos = 0;
				_pt.eno = 0;
				_pt.epos = 0;

				RandomAccessFile waitfile = null;
				RandomAccessFile donefile = null;
				try {
					waitfile = new RandomAccessFile(dataPath+".add", "rw");
					donefile = new RandomAccessFile(dataPath+".take", "rw");
					waitfile.setLength(0);
					donefile.setLength(0);
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					if (null != waitfile) try {
						waitfile.close();
					} catch (Exception e) {}
					if (null != donefile) try {
						donefile.close();
					} catch (Exception e) {}
				}
				return true;
			}
			return false;
		} finally {
			datlock.unlock();
		}
	}
	
	@Override
	public String toString() {
		datlock.lock();
		try {
			return String.format("path:%s,size:%d,blocksize:%d,list:%d,point:[(%d,%d)(%d,%d)]"
				,dataPath, size(), singleLength, _queueLst.size()
				,_pt.bno, _pt.bpos, _pt.eno, _pt.epos);
		} finally {
			datlock.unlock();
		}
	}
}
